处理分布式事务(SpringCloud Alibaba Seata)

您所在的位置:网站首页 springcloud client 处理分布式事务(SpringCloud Alibaba Seata)

处理分布式事务(SpringCloud Alibaba Seata)

#处理分布式事务(SpringCloud Alibaba Seata)| 来源: 网络整理| 查看: 265

前言

一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题

Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

Seata官网 下载

分布式事务过程:

分布式事务处理过程的一ID+三组件模型

Transaction ID XID:全局唯一的事务ID

3组件概念

Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;

Transaction Manager ™:控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;

Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

处理过程:

TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID; XID 在微服务调用链路的上下文中传播; RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖; TM 向 TC 发起针对 XID 的全局提交或回滚决议; TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

在这里插入图片描述 使用:

在业务入口方法上开启 @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class) 复制代码 Seata-Server安装

修改conf目录下的seata\conf\file.conf配置文件

主要修改:自定义事务组名称+事务日志存储模式为db+数据库连接信息

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

mysql5.7数据库新建库seata

在seata库,执行 \seata-server-0.9.0\seata\conf\db_store.sql

修改seata-server-0.9.0\seata\conf目录下的registry.conf配置文件 在这里插入图片描述

启动nacos,启动 seata

实际应用

这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务。

当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存, 再通过远程调用账户服务来扣减用户账户里面的余额, 最后在订单服务中修改订单状态为已完成。

该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。

业务流程:下订单->减库存->扣余额->改(订单)状态

seata_order:存储订单的数据库; seata_storage:存储库存的数据库; seata_account:存储账户信息的数据库。

seata_order库下建t_order订单表 seata_storage库下建t_storage库存表 seata_account库下建t_account 账户表

订单-库存-账户3个库下都需要建各自的回滚日志表,\seata-server-0.9.0\seata\conf目录下的db_undo_log.sql

这里只列举创建订单微服务,代码太多,已上传 gitee

新建Module:seata-order-service2001

pom

com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-seata seata-all io.seata io.seata seata-all 0.9.0 org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator mysql mysql-connector-java 5.1.37 com.alibaba druid-spring-boot-starter 1.1.10 org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 org.springframework.boot spring-boot-starter-test test org.projectlombok lombok true 复制代码

yml

server: port: 2001 spring: application: name: seata-order-service cloud: alibaba: seata: #自定义事务组名称需要与seata-server中的对应 tx-service-group: fsp_tx_group nacos: discovery: server-addr: localhost:8848 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_order username: root password: 123456 feign: hystrix: enabled: false logging: level: io: seata: info mybatis: mapperLocations: classpath:mapper/*.xml 复制代码

resources/file.conf

transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { vgroup_mapping.fsp_tx_group = "default" #修改自定义事务组名称 default.grouplist = "127.0.0.1:8091" enableDegrade = false disable = false max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" disableGlobalTransaction = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } report.retry.count = 5 tm.commit.retry.count = 1 tm.rollback.retry.count = 1 } ## transaction log store store { ## store mode: file、db mode = "db" ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://127.0.0.1:3306/seata" #seata的数据库 user = "root" password = "123456" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } lock { ## the lock store mode: local、remote mode = "remote" local { ## store locks in user's database } remote { ## store locks in the seata's server } } recovery { #schedule committing retry period in milliseconds committing-retry-period = 1000 #schedule asyn committing retry period in milliseconds asyn-committing-retry-period = 1000 #schedule rollbacking retry period in milliseconds rollbacking-retry-period = 1000 #schedule timeout retry period in milliseconds timeout-retry-period = 1000 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" } ## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } } 复制代码

resources/registry.conf

registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" ##注册类型 nacos { serverAddr = "localhost:8848" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "localhost" namespace = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } } 复制代码

OrderDao

@Mapper public interface OrderDao { /** * 创建订单 */ void create(Order order); /** * 修改订单金额 */ void update(@Param("userId") Long userId, @Param("status") Integer status); } 复制代码

resources/mapper/OrderMapper.xml

INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`) VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0); UPDATE `t_order` SET status = 1 WHERE user_id = #{userId} AND status = #{status}; 复制代码

OrderService

public interface OrderService { /** * 创建订单 */ void create(Order order); } 复制代码

OrderServiceImpl

import com.atguigu.springcloud.alibaba.dao.OrderDao; import com.atguigu.springcloud.alibaba.domain.Order; import com.atguigu.springcloud.alibaba.service.AccountService; import com.atguigu.springcloud.alibaba.service.OrderService; import com.atguigu.springcloud.alibaba.service.StorageService; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Resource; @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; /** * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态 * 简单说: * 下订单->减库存->减余额->改状态 */ @Override @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class) public void create(Order order) { log.info("------->下单开始"); //本应用创建订单 orderDao.create(order); //远程调用库存服务扣减库存 log.info("------->order-service中扣减库存开始"); storageService.decrease(order.getProductId(),order.getCount()); log.info("------->order-service中扣减库存结束"); //远程调用账户服务扣减余额 log.info("------->order-service中扣减余额开始"); accountService.decrease(order.getUserId(),order.getMoney()); log.info("------->order-service中扣减余额结束"); //修改订单状态为已完成 log.info("------->order-service中修改订单状态开始"); orderDao.update(order.getUserId(),0); log.info("------->order-service中修改订单状态结束"); log.info("------->下单结束"); } } 复制代码

StorageService

@FeignClient(value = "seata-storage-service") public interface StorageService { /** * 扣减库存 */ @PostMapping(value = "/storage/decrease") CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count); } 复制代码

AccountService

@FeignClient(value = "seata-account-service") public interface AccountService { /** * 扣减账户余额 */ //@RequestMapping(value = "/account/decrease", method = RequestMethod.POST, produces = "application/json; ") @PostMapping("/account/decrease") CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money); } 复制代码

controller

@RestController public class OrderController { @Autowired private OrderService orderService; /** * 创建订单 */ @GetMapping("/order/create") public CommonResult create( Order order) { orderService.create(order); return new CommonResult(200, "订单创建成功!"); } } 复制代码

config.MyBatisConfig.java

@Configuration @MapperScan({"com.atguigu.springcloud.alibaba.dao"}) public class MyBatisConfig { } 复制代码

config.DataSourceProxyConfig.java

import com.alibaba.druid.pool.DruidDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.transaction.SpringManagedTransactionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; /** * @auther zzyy * @create 2019-12-11 16:58 * 使用Seata对数据源进行代理 */ @Configuration public class DataSourceProxyConfig { @Value("${mybatis.mapperLocations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ return new DruidDataSource(); } @Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); } } 复制代码

主启动:

@EnableDiscoveryClient @EnableFeignClients @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建 public class SeataOrderMainApp2001 { public static void main(String[] args) { SpringApplication.run(SeataOrderMainApp2001.class, args); } } 复制代码

库存、账户服务类似。

下订单入口添加了@GlobalTransactional,要么全执行,要么回滚,事务没啥可说的,下边主要看看实现的原理

补充

Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架

分布式事务的执行流程:

TM 开启分布式事务(TM 向 TC 注册全局事务记录); 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 ); TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务); TC 汇总事务信息,决定分布式事务是提交还是回滚; TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。​

Seata有四大模式,默认AT模式 在这里插入图片描述 在这里插入图片描述 在一阶段,Seata 会拦截“业务 SQL”, 1 解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”, 2 执行“业务 SQL”更新业务数据,在业务数据更新之后, 3 其保存成“after image”,最后生成行锁。 以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

在这里插入图片描述 二阶段如是顺利提交的话, 因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。 在这里插入图片描述 二阶段回滚: 二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。 回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”, 如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。 在这里插入图片描述 在下订单的方法打断点,此时去数据库看undo_log表,可以看到操作数据前后,seata都做了前置快照(before image)后置快照(after image),当正常提交或回滚后数据都会被删除,遇到脏写就需要人工处理

在这里插入图片描述

在这里插入图片描述 在这里插入图片描述 展开 rows,可以看到每个表执行事务前、执行事务后字段的值 在这里插入图片描述 当正常提交,回滚后这些记录都会被删除。

完整代码已上传:gitee



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3